/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk;



import com.bff.gaia.unified.sdk.io.FileSystems;

import com.bff.gaia.unified.sdk.options.PipelineOptions;

import com.bff.gaia.unified.sdk.options.PipelineOptionsFactory;

import com.bff.gaia.unified.sdk.options.PipelineOptionsValidator;

import com.bff.gaia.unified.sdk.transforms.PTransform;

import com.bff.gaia.unified.sdk.util.InstanceBuilder;

import com.bff.gaia.unified.sdk.values.PBegin;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkNotNull;



/**

 * A {@link PipelineRunner} runs a {@link Pipeline}.

 *

 * @param <ResultT> the type of the result of {@link #run}, often a handle to a running job.

 */

public abstract class PipelineRunner<ResultT extends PipelineResult> {



  /**

   * Constructs a runner from the provided {@link PipelineOptions}.

   *

   * @return The newly created runner.

   */

  public static PipelineRunner<? extends PipelineResult> fromOptions(PipelineOptions options) {

    checkNotNull(options);

    PipelineOptionsValidator.validate(PipelineOptions.class, options);



    // (Re-)register standard FileSystems. Clobbers any prior credentials.

    FileSystems.setDefaultPipelineOptions(options);



    @SuppressWarnings("unchecked")

	PipelineRunner<? extends PipelineResult> result =

        InstanceBuilder.ofType(PipelineRunner.class)

            .fromClass(options.getRunner())

            .fromFactoryMethod("fromOptions")

            .withArg(PipelineOptions.class, options)

            .build();

    return result;

  }



  /**

   * Creates a runner from the default app {@link PipelineOptions}.

   *

   * @return The newly created runner.

   */

  public static PipelineRunner<? extends PipelineResult> create() {

    return fromOptions(PipelineOptionsFactory.create());

  }



  /**

   * Processes the given {@link Pipeline}, potentially asynchronously, returning a runner-specific

   * type of result.

   */

  public abstract ResultT run(Pipeline pipeline);



  /** Creates a {@link Pipeline} out of a single {@link PTransform} step, and executes it. */

  public ResultT run(PTransform<PBegin, ?> pTransform, PipelineOptions options) {

    Pipeline p = Pipeline.create(options);

    p.apply(pTransform);

    return run(p);

  }



  /**

   * Overloaded {@link PTransform} runner that runs with the default app {@link PipelineOptions}.

   */

  public ResultT run(PTransform<PBegin, ?> pTransform) {

    return run(pTransform, PipelineOptionsFactory.create());

  }

}